


#include "qe_def.h"
#include "qe_memory.h"
#include "qe_service.h"
#include "qe_time.h"
#include "qe_queue.h"



qe_bool qe_minpq_isfull(qe_minpq *q)
{
    return (q->num_elems == q->cnt_elems);
}

qe_bool qe_minpq_isempty(qe_minpq *q)
{
    return (q->cnt_elems == 0);
}

unsigned int qe_minpq_wait(qe_minpq *q)
{
    return (q->cnt_elems);
}

qe_minpq *qe_minpq_create(unsigned int elem_size, unsigned int num_elems, 
	qe_bool (*comparator)(void *, void *))
{
	int allocsize;

	allocsize = (sizeof(int) + elem_size) * (num_elems + 1);
	allocsize += sizeof(qe_minpq);
	qe_minpq *q = qe_malloc(allocsize);
	if (!q) {return QE_NULL;}

	q->cnt_elems = 0;
	q->elem_size = elem_size;
	q->num_elems = num_elems;

	if (comparator != QE_NULL) {
		q->comparator = comparator;
	} else {
		q->comparator = QE_NULL;
	}

	q->prio = (unsigned int *)((char *)q + sizeof(qe_minpq));
	q->data = (char *)q->prio + sizeof(int) * (num_elems + 1);
	return q;
}

static qe_bool minpq_greater(qe_minpq *q, int i, int j)
{
    void *p1, *p2;

    if (!q->comparator) {
        return (q->prio[i] > q->prio[j]);
    } else {
        p1 = q->data + q->elem_size * i;
        p2 = q->data + q->elem_size * j;
        return q->comparator(p1, p2);
    }
}

static void minpq_swap(qe_minpq *q, int i, int j)
{
    int tempp;
    void *p1, *p2;
    void *pt = qe_malloc(q->elem_size);

    tempp = q->prio[i];
    q->prio[i] = q->prio[j];
    q->prio[j] = tempp;

    p1 = q->data + q->elem_size * i;
    p2 = q->data + q->elem_size * j;
    qe_memcpy(pt, p1, q->elem_size);
    qe_memcpy(p1, p2, q->elem_size);
    qe_memcpy(p2, pt, q->elem_size);
	qe_free(pt);
}

static void minpq_sink(qe_minpq *q, unsigned int k)
{
    int j;

    while ((2 * k) <= q->cnt_elems) {
        j = 2 * k;
        if ((j < q->cnt_elems) && (minpq_greater(q, j, j+1)))
            j += 1;
        if (!minpq_greater(q, k, j)) 
            break;
        minpq_swap(q, k, j);
        k = j;
    }
}

static void minpq_swim(qe_minpq *q, int k)
{
    while ((k>1) && (minpq_greater(q, k/2, k))) {
        minpq_swap(q, k, k/2);
        k = k/2;
    }
}

qe_ret qe_minpq_resize(qe_minpq *q, unsigned int capcaity)
{
    void *new_v, *new_p;
    
    if (capcaity <= q->cnt_elems) {
        return qe_err_param;
    }

    new_v = qe_malloc(q->elem_size * (capcaity + 1));
    if (!new_v) {return qe_err_mem;}

	new_p = qe_malloc(sizeof(unsigned int) * (capcaity + 1));
    if (!new_p) {return qe_err_mem;}

    qe_memcpy(new_v, q->data, q->elem_size * (q->cnt_elems + 1));
	qe_memcpy(new_p, q->data, sizeof(int)*(q->cnt_elems + 1));
    qe_free(q->data);
	qe_free(q->prio);
    q->data = new_v;
	q->prio = (unsigned int *)new_p;
    q->num_elems = capcaity;
    return qe_ok;
}

qe_ret qe_minpq_delmin(qe_minpq *q, void *e, unsigned int *p)
{
	void *p1, *p2;

    if (qe_minpq_isempty(q)) {
        return qe_err_empty;
    }

	p1 = q->data + q->elem_size;
	qe_memcpy(e, p1, q->elem_size);

	*p = q->prio[1];
	q->prio[1] = q->prio[q->cnt_elems];

	p2 = q->data + q->elem_size * q->cnt_elems;
	qe_memcpy(p1, p2, q->elem_size);
	q->cnt_elems--;

	minpq_sink(q, 1);

	return qe_ok;
}

qe_ret qe_minpq_insert(qe_minpq *q, void *u, int p)
{
    void *addr;

    if (qe_minpq_isfull(q)) {
        if (qe_mask_exst(q->flags, QE_MINPQ_F_RES)) {
            if (!q->num_elems) q->num_elems = 1;
            qe_minpq_resize(q, q->num_elems*2);
        } else {
            return qe_err_full;
        }
    }

    q->prio[++q->cnt_elems] = p;
    addr = q->data + q->elem_size * q->cnt_elems;
    qe_memcpy(addr, u, q->elem_size);
    minpq_swim(q, q->cnt_elems);
    return qe_ok;
}

qe_ret qe_minpq_del(qe_minpq *q, void *u, unsigned int *p, int index)
{
	void *p1, *p2;

    if (qe_minpq_isempty(q)) {
        return qe_err_empty;
    }

	if (index < 1) {
		return qe_err_param;
	}

	p1 = q->data + q->elem_size * index;
	qe_memcpy(u, p1, q->elem_size);
    *p = q->prio[index];
    q->prio[index] = q->prio[q->cnt_elems];
    p2 = q->data + q->elem_size * q->cnt_elems;
    qe_memcpy(p1, p2, q->elem_size);
    q->cnt_elems--;
    minpq_sink(q, index);
    q->prio[q->cnt_elems + 1] = 0;

    if ((q->cnt_elems > 0) && 
        (q->cnt_elems == ((q->num_elems - 1) / 4)) &&
        qe_mask_exst(q->flags, QE_MINPQ_F_RES)) {
        qe_minpq_resize(q, q->num_elems/2);
    }

    p2 = q->data + q->elem_size*(q->cnt_elems+1);
    qe_memset(p2, 0x0, q->elem_size);
    return qe_ok;
}

qe_ret qe_minpq_peek(qe_minpq *q, void *u, unsigned int *p, int index)
{
    void *addr;

    if (qe_minpq_isempty(q)) {
        return qe_err_empty;
    }

    addr = q->data + q->elem_size * index;
    qe_memcpy(u, addr, q->elem_size);
    *p = q->prio[index];
    return qe_ok;
}

/**
 * @brief Queue create
 * 
 * @param elem_size : element size in queue
 * @param num_elems : maximum number of queue elements
 * @param flags     : queue flags
 * @return qe_queue* 
 */
qe_queue *qe_queue_create(unsigned int elem_size, unsigned int num_elems)
{
	qe_queue *q;

	if (!elem_size || !num_elems)
		return QE_NULL;

	q = (qe_queue *)qe_malloc(sizeof(qe_queue) + elem_size * num_elems);
	if (!q) {return QE_NULL;}

	q->data      = (char *)q + sizeof(qe_queue);
	q->head      = 0;
	q->tail      = 0;
	q->count     = 0;
	q->dynamic   = 1;
	q->elem_size = elem_size;
	q->num_elems = num_elems;

	return q;
}

/**
 * @brief Initialize a queue from a external data array
 * 
 * @param q 	    : queue to be initialize
 * @param data      : data array
 * @param elem_size : element size in queue
 * @param num_elems : maximum number of queue elements
 * @param count     : valid elements in data array
 * 
 * @return qe_ret
 */
qe_ret qe_queue_init(qe_queue *q, void *data, unsigned int elem_size, unsigned int num_elems, 
	unsigned int count)
{
	if (!q || !data || !elem_size || !num_elems)
		return qe_err_param;

	q->data      = data;
	q->head      = 0;
	q->tail      = 0;
	q->count     = count;
	q->dynamic   = 0;
	q->elem_size = elem_size;
	q->num_elems = num_elems;

	return qe_ok;
}

/**
 * @brief Is the queue empty
 * 
 * @param q : queue
 * @return qe_bool 
 */
qe_bool qe_queue_isempty(qe_queue *q)
{
	return (q->count == 0);
}

/**
 * @brief Is the queue full
 * 
 * @param q : queue
 * @return qe_bool 
 */
qe_bool qe_queue_isfull(qe_queue *q)
{
	return (q->count == q->num_elems);
}

/**
 * @brief Get queue valid elements number
 * 
 * @param q : queue
 * @return number 
 */
unsigned int qe_queue_wait(qe_queue *q)
{
	return q->count;
}

/**
 * @brief Get one element from the given index position of the queue, 
 * but not remove element itself
 * 
 * @param q     : queue to get element
 * @param index : index position
 * @param elem  : element address to copy
 * @return qe_ret
 */
qe_ret qe_queue_peek(qe_queue *q, int index, void *elem)
{
	char *p;
	
	if (qe_queue_isempty(q) || (index > q->count)) {
		return qe_err_empty;
	}

	p = (char *)q->data + (q->head + index) * q->elem_size;
	qe_memcpy(elem, p, q->elem_size);
	return qe_ok;
}

/**
 * @brief Get multiple elements from the given index position of the queue
 * 
 * @param q     : queue to get elements
 * @param index : index position
 * @param elem  : elements address to copy
 * @param num   : elements number
 * @return qe_ret
 */
qe_ret qe_queue_peek_elems(qe_queue *q, int index, void *elem, unsigned int num)
{
	char *p;

	if (q->count < (index + num)) {
		return qe_err_param;
	}

	p = (char *)q->data + (q->head + index) * q->elem_size;
	qe_memcpy(elem, p, num * q->elem_size);
	return qe_ok;
}

/**
 * @brief Enqueue one element to queue
 * 
 * @param q    : queue to do enqueue
 * @param elem : element to do enqueue
 * 
 * @return qe_ret 
 */
qe_ret qe_queue_enq(qe_queue *q, void *elem)
{
	char *addr;

	if (!q || !elem) {
		return QE_NULL;
	}

	if (qe_queue_isfull(q)) {
		return qe_err_full;
	}

	addr = (char *)q->data + q->tail * q->elem_size;
	qe_memcpy(addr, elem, q->elem_size);
	q->tail = (q->tail + 1) % q->num_elems;
	q->count++;

	return qe_ok;
}

/**
 * @brief Enqueue multiple elements to queue
 * 
 * @param q    : queue to do enqueue
 * @param elem : elements to do enqueue
 * @param num  : elements number
 * 
 * @return qe_ret 
 */
qe_ret qe_queue_enq_elems(qe_queue *q, void *elem, unsigned int num)
{
	int free;
	char *p;

	if (!q || !elem) {
		return QE_NULL;
	}

	free = q->num_elems = q->count;
	if (free < num) {
		return qe_err_notenough;
	}

	p = (char *)elem;

	while (num--) {
		qe_queue_enq(q, p);
		p += q->elem_size;
	}

	return qe_ok;
}

/**
 * @brief Dequeue one element from queue
 * 
 * @param q    : queue to dequeue
 * @param elem : element address
 * @return qe_ret 
 */
qe_ret qe_queue_deq(qe_queue *q, void *elem)
{
	char *p;
	
	if (!q || !elem)
		return qe_err_param;

	if (qe_queue_isempty(q)) {
		return qe_err_empty;
	}

	p = (char *)q->data + q->head * q->elem_size;
	qe_memcpy(elem, p, q->elem_size);
	q->head = (q->head + 1) % q->num_elems;
	q->count--;

	return qe_ok;
}

/**
 * @brief Dequeue multiple elements from queue
 * 
 * @param q    : queue
 * @param elem : elements address
 * @param num  : elements number
 * 
 * @return qe_ret
 */
qe_ret qe_queue_deq_elems(qe_queue *q, void *elem, unsigned int num)
{
	char *p = (char *)elem;

	if (q->count < num) {
		return qe_err_notenough;
	}

	while (num--) {
		qe_queue_deq(q, p);
		p += q->elem_size;
	}

	return qe_ok;
}

static qe_bool qe_worker_comparator(void *d1, void *d2)
{
	qe_wq_worker *w1, *w2;

	w1 = (qe_wq_worker *)d1;
	w2 = (qe_wq_worker *)d2;

	return (w1->call_ts > w2->call_ts);
}

qe_workq *qe_workqueue_new(unsigned int size)
{
	return qe_minpq_new(sizeof(qe_wq_worker), size, qe_worker_comparator);
}

qe_ret qe_workqueue_schedule(qe_workq *q, 
	qe_ret (*handle)(qe_workq * , qe_wq_worker *), qe_u32 ts,
		void *data)
{
	qe_wq_worker worker;
	qe_u32 call_ts = ts + qe_time_ms();
	worker.data = data;
	worker.handle = handle;
	worker.call_ts = call_ts;
	return qe_minpq_insert((qe_minpq *)q, &worker, 0);
}

qe_u64 qe_workqueue_service(qe_workq *q, qe_u32 ts)
{
	unsigned int prio;
	qe_wq_worker worker;

	if (!q || qe_minpq_isempty(q))
		return 0;

	do {

		qe_minpq_peek(q, &worker, &prio, 1);
		if (worker.call_ts > ts) {
			return (worker.call_ts - ts);
		} else {
			qe_minpq_delmin(q, &worker, &prio);
			worker.handle(q, &worker);
			return 0;
		}

	} while (1);

	return 0;
}

void qe_ringq_init(qe_ringq *q, void *buf, qe_size elem_size, qe_size num_elems)
{
    q->buf       = buf;
    q->head      = 0;
    q->tail      = 0;
    q->count     = 0;
    q->elem_size = elem_size;
    q->num_elems = num_elems;
    q->depth     = elem_size * num_elems;

    qe_memset(buf, 0x0, q->depth);
}

void qe_ringq_fromarray(qe_ringq *q, void *array, qe_size elem_size, qe_size num_elems)
{
	q->buf       = array;
	q->head      = 0;
	q->tail      = 0;
	q->elem_size = elem_size;
	q->num_elems = num_elems;
	q->depth     = elem_size * num_elems;
	q->count     = num_elems;
}

qe_ringq *qe_ringq_create(qe_size elem_size, qe_size num_elems)
{	
	qe_ringq *q = qe_malloc(sizeof(qe_ringq) + elem_size*num_elems);
	if (!q) {return QE_NULL;}

	q->buf       = (char *)q + sizeof(qe_ringq);
	q->head      = 0;
	q->tail      = 0;
	q->count     = 0;
	q->elem_size = elem_size;
	q->num_elems = num_elems;
	q->depth     = elem_size * num_elems;
	return q;
}

qe_bool qe_ringq_isempty(qe_ringq *q)
{
	return (q->count == 0);
}

qe_bool qe_ringq_isfull(qe_ringq *q)
{
	return (q->count == q->num_elems);
}

qe_ret qe_ringq_enq(qe_ringq *q, void *data, unsigned int num)
{
	char *p, *d;
	
	d = (char *)data;

	while (num--) {
		
		p = (char *)q->buf;
		qe_memcpy(p+q->tail, (char *)d, q->elem_size);
		q->tail = (q->tail + q->elem_size) % (q->elem_size * q->num_elems);
		q->count++;
		if ((q->tail > q->head) && ((q->tail - q->head) < (q->count * q->elem_size))) {
			q->count--;
			q->head = (q->head + q->elem_size) % (q->elem_size * q->num_elems);
		}
		
		d += q->elem_size;
	}

	return qe_ok;
}

qe_ret qe_ringq_deq(qe_ringq *q, void *data, unsigned int num)
{
	char *p, *d;

	if (q->count < num) {
		return qe_err_range;
	}

	d = (char *)data;

	while (num--) {

		if (qe_ringq_isempty(q))
			break;

		p = (char *)q->buf;
		qe_memcpy(d, p+q->head, q->elem_size);
		q->head += q->elem_size;

		if (q->head >= q->depth) {
			q->head = 0;
		}

		q->count--;
	}

	return qe_ok;
}

qe_ret qe_ringq_get(qe_ringq *q, int index, void *data)
{
	char *pos;
	
	if (qe_ringq_isempty(q) || (index > q->count)) {
		return qe_err_empty;
	}

	pos = q->buf + q->head + index * q->elem_size;
	qe_memcpy(data, pos, q->elem_size);
	return qe_ok;
}

qe_uint qe_ringq_wait(qe_ringq *q)
{
	return q->count;
}

qe_uint qe_ringq_size(qe_ringq *q)
{
	return q->num_elems;
}

qe_uint qe_ringq_free(qe_ringq *q)
{
	return (q->num_elems - q->count);
}

qe_uint qe_ringq_nbytes(qe_ringq *q)
{
	return q->depth;
}

void qe_ringq_clear(qe_ringq *q)
{
	if (!q)
		return;

	q->head = 0;
	q->tail = 0;
	q->count = 0;
}